package com.jplus.plugins.rpc.core;

import io.netty.util.internal.ThreadLocalRandom;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.jplus.framework.core.CharSet;
import com.jplus.framework.util.FormatUtil;
import com.jplus.plugins.rpc.Constant;
import com.jplus.plugins.rpc.bean.KeyNode;
import com.jplus.plugins.rpc.bean.ZNode;

/**
 * ZooKeeper Java Api 使用样例<br>
 */
public class ZkFactory {

	private static final Logger logger = LoggerFactory.getLogger(ZkFactory.class);
	private static ZooKeeper zk = null;
	private static CountDownLatch latch = new CountDownLatch(1);

	private static Map<String, List<ZNode>> dataAllList = new HashMap<String, List<ZNode>>();

	/**
	 * 【消费者/提供者】循环创建节点
	 */
	public static void createPathLine(String path) {
		String[] temp = path.substring(1).split("/");
		String node = "";
		for (int i = 0; i < temp.length; i++) {
			node += "/" + temp[i];
			if (!exists(node)) {
				if (i < temp.length - 1)
					createPath(node, "", CreateMode.PERSISTENT);
				else
					createPath(node, "", CreateMode.EPHEMERAL);
			}
		}
	}

	/**
	 * 【消费者】获取观察节点
	 */
	public static void watchNode() {
		final ZooKeeper zk = getZK();
		if (zk != null) {
			try {
				List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
					@Override
					public void process(WatchedEvent event) {
						if (event.getType() == Event.EventType.NodeChildrenChanged) {
							watchNode();
						}
					}
				});
				Map<String, List<ZNode>> dataList = new HashMap<String, List<ZNode>>();
				for (String node : nodeList) {
					List<String> serverList = zk.getChildren(Constant.ZK_REGISTRY_PATH + "/" + node + "/" + Constant.PROVIDERS, new Watcher() {
						@Override
						public void process(WatchedEvent event) {
							if (event.getType() == Event.EventType.NodeChildrenChanged) {
								watchNode();
							}
						}
					});
					for (String ser : serverList) {
						ZNode tz = JSON.parseObject(URLDecoder.decode(ser, CharSet.Default), ZNode.class);
						String keyNode = new KeyNode(tz).MD5();
						List<ZNode> tlist = dataList.get(keyNode);
						if (tlist == null)
							tlist = new ArrayList<ZNode>();
						tlist.add(tz);
						dataList.put(keyNode, tlist);
					}
				}
				logger.info("[ZK] node data: {}", dataList);
				dataAllList = dataList;
			} catch (KeeperException | InterruptedException e) {
				logger.error("获取Zookeeper节点失败：", e);
			} catch (UnsupportedEncodingException e) {
				logger.error("URLDecoder.decode失败：", e);
			}
		}
	}

	/**
	 * 【消费者】获取一个服务提供者
	 */
	public static ZNode discover(ZNode cliNode) {
		ZNode data = null;
		String key = new KeyNode(cliNode).MD5();
		List<ZNode> dataList = dataAllList.get(key);
		if (dataList != null && dataList.size() > 0) {
			int size = dataList.size();
			if (size == 1) {
				data = dataList.get(0);
			} else {
				data = dataList.get(ThreadLocalRandom.current().nextInt(size));// 随机
				// 。。。权重，优先级。。
			}
			logger.info("using rpc info: {}", data);
		}
		return data;
	}

	// ====================================================================
	public static ZooKeeper getZK() {
		return zk;
	}

	/**
	 * 创建ZK连接
	 * 
	 * @param connectString
	 *            ZK服务器地址列表
	 * @param sessionTimeout
	 *            Session超时时间
	 */
	public static void createConnection(String connectString, int sessionTimeout) {
		if (zk == null)
			try {
				zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
					@Override
					public void process(WatchedEvent event) {
						if (KeeperState.SyncConnected == event.getState()) {
							latch.countDown();
						}
					}
				});
				latch.await();
			} catch (Exception e) {
				logger.error("连接创建失败,connectString=" + connectString, e);
				e.printStackTrace();
			}
	}

	/**
	 * 关闭ZK连接
	 */
	public static void close() {
		if (!FormatUtil.isEmpty(zk)) {
			try {
				zk.close();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 获取所有子节点
	 */
	public static List<String> getChildren(String path) {
		try {
			return zk.getChildren(path, false);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return new ArrayList<String>();
	}

	/**
	 * 创建节点
	 */
	public static boolean createPath(String path, String data, CreateMode mode) {
		try {
			logger.info("节点创建成功, Path: " + zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, mode) + ", content: " + data);
			return true;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 读取指定节点数据内容
	 */
	public static String readData(String path) {
		try {
			return new String(zk.getData(path, false, null));
		} catch (Exception e) {
			e.printStackTrace();
			return "";
		}
	}

	/**
	 * 更新指定节点数据内容
	 */
	public static boolean writeData(String path, String data) {
		try {
			logger.info("更新数据成功，path：" + path + ", stat: " + zk.setData(path, data.getBytes(), -1));
			return true;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 删除指定节点
	 */
	public static void deleteNode(String path) {
		try {
			zk.delete(path, -1);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 判断节点是否存在
	 */
	public static boolean exists(String path) {
		try {
			Stat stat = zk.exists(path, false);
			return stat == null ? false : true;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	public static Map<String, List<ZNode>> getDataAllList() {
		return dataAllList;
	}

}